Conversation
| @Override | ||
| public String constructUpsertQuery(String table, String[] fieldNames, String[] listKeys) { | ||
| if (listKeys == null) { | ||
| throw new IllegalArgumentException("Column names to be updated should not be null"); |
There was a problem hiding this comment.
what is the name for this config in UI?
Column names does not help understand which config are we exactly talking about.
Also, can we move these to sink config validations when UPSERT is selected?
There was a problem hiding this comment.
Yes, we also have this validation in Abstract DB sink
There was a problem hiding this comment.
The naming convention is taken from ETLDBOutputFormat and PostgresETLDBOutputFormat
There was a problem hiding this comment.
if we have these validations, why will there be a point when code reaches here without these properties present?
| if (listKeys == null) { | ||
| throw new IllegalArgumentException("Column names to be updated should not be null"); | ||
| } else if (fieldNames == null) { | ||
| throw new IllegalArgumentException("Field names should not be null"); |
There was a problem hiding this comment.
similar comment here.
Better to move these to sink config validations to fail early.
There was a problem hiding this comment.
This is input schema
There was a problem hiding this comment.
so these validations should move to source.
| @Override | ||
| protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema, | ||
| String fieldName, int fieldIndex) throws SQLException { | ||
| int sqlType = columnTypes.get(fieldIndex).getType(); |
There was a problem hiding this comment.
In which case, is it possible that fieldIndex is greater than columnTypes.size()-1?
There was a problem hiding this comment.
The logic in common class adds col (that are to be updated) and then col (which we are going to match), this can lead to query having duplicate reference and the field index be more than the total col size.
| int sqlType = Types.OTHER; | ||
| // avoid OOB exception in case of mismatch between columnTypes and record schema fields | ||
| for (ColumnType columnType : columnTypes) { | ||
| if (columnType.getName().equals(fieldName)) { |
There was a problem hiding this comment.
should we be using equalsIgnoreCase like
There was a problem hiding this comment.
no, oracle has case sensitive col names
| String fieldName, int fieldIndex) throws SQLException { | ||
| int sqlType = columnTypes.get(fieldIndex).getType(); | ||
| int sqlIndex = fieldIndex + 1; | ||
| int sqlType = Types.OTHER; |
There was a problem hiding this comment.
should we be throwing an error if none of the fieldName matches in the columnType.getName() list?
Why are we assuming Types.OTHER when there is no match?
There was a problem hiding this comment.
It will fail for Types.OTHER, but throwing an error sound like better option.
This pull request introduces support for upsert operations in the Oracle batch sink plugin.
OracleETLDBOutputFormatthat extendsETLDBOutputFormatand implements theconstructUpsertQuerymethod, generating an Oracle-compatible MERGE-based upsert SQL statement.OracleSinkto use the newOracleETLDBOutputFormatby adding an output context withSinkOutputFormatProvider.UI Update